Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/1.5.8/zenoh utransport impl #65

Merged

Conversation

gregmedd
Copy link
Contributor

@gregmedd gregmedd commented Jul 8, 2024

With patches applied to #54

Copy link
Contributor Author

@gregmedd gregmedd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stopping part way into this review. In addition to the comments recorded, the known list of TODO items includes:

Comment on lines +22 to +45
#define ZENOHCXX_ZENOHC
#include <zenoh.hxx>

namespace zenohc {

class OwnedQuery {
public:
OwnedQuery(const z_query_t& query) : _query(z_query_clone(&query)) {}

OwnedQuery(const OwnedQuery&) = delete;
OwnedQuery& operator=(const OwnedQuery&) = delete;

~OwnedQuery() { z_drop(&_query); }

Query loan() const { return z_loan(_query); }
bool check() const { return z_check(_query); }

private:
z_owned_query_t _query;
};

using OwnedQueryPtr = std::shared_ptr<OwnedQuery>;

} // namespace zenohc
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be relocated into the .cpp file. It's a detail specific to this implementation and does not need to be exposed externally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is temporary workaround and it should be removed after switching to zenohcpp 1.0

: listener(listener), zenoh_key(zenoh_key) {}

bool operator==(const ListenerKey& other) const {
return listener == other.listener && zenoh_key == other.zenoh_key;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return listener == other.listener && zenoh_key == other.zenoh_key;
return (listener == other.listener) && (zenoh_key == other.zenoh_key);

This && might need to be changed or an additional equality operator for CallableConn only might need to be added, assuming the intent is for this key to be used as part of the callback cleanup process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe that's not the intent here - the operator< sorts on both

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class needs only if the cleanupListener will receive sink/source filters like in rust, but currently it receives only listener, and I still haven't received an answer why this is so.

Comment on lines +112 to +115
using RpcCallbackMap = std::map<UuriKey, CallableConn>;
using SubscriberMap = std::map<ListenerKey, zenoh::Subscriber>;
using QueryableMap = std::map<ListenerKey, zenoh::Queryable>;
using QueryMap = std::map<std::string, zenoh::OwnedQueryPtr>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some clarification on what these are would be helpful.

The types are also only used once and not particularly long, so the using statements could probably be removed.

Comment on lines +149 to +144
static std::string toZenohKeyString(
const std::string& default_authority_name, const v1::UUri& source,
const std::optional<v1::UUri>& sink);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be documented.

Comment on lines +154 to +157
static v1::UStatus uError(v1::UCode code, std::string_view message);

static std::vector<std::pair<std::string, std::string>>
uattributesToAttachment(const v1::UAttributes& attributes);

static v1::UAttributes attachmentToUAttributes(
const zenoh::AttachmentView& attachment);

static zenoh::Priority mapZenohPriority(v1::UPriority upriority);

static v1::UMessage sampleToUMessage(const zenoh::Sample& sample);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could probably be in the anonymous namespace of the .cpp file.

Comment on lines +126 to +127
// TODO: more efficient way?
res.ParseFromString(std::string(attachment_vec[1].as_string_view()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improvements should be tracked as issues.

src/ZenohUTransport.cpp Show resolved Hide resolved

UMessage ZenohUTransport::sampleToUMessage(const Sample& sample) {
UAttributes attributes;
if (sample.get_attachment().check()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is check()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the check fails, the UMessage would have blank attributes. That doesn't seem to be valid. The return type should probably be Expected<UMessage, ...> so errors can be returned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check() means that the attachment exists in the sample

Comment on lines 160 to 161
message.set_payload(payload);
message.set_allocated_attributes(&attributes);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set_allocated_attributes is not safe. Use assignment on the dereferenced mutable attributes:

Suggested change
message.set_payload(payload);
message.set_allocated_attributes(&attributes);
message.set_payload(std::move(payload));
(*message.mutable_attributes()) = std::move(attributes);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it was already fixed in my PR

session_(expect<Session>(open(
std::move(expect(config_from_file(configFile.string().c_str())))))) {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be easier to read as a function in the anonymous namespace.

Suggested change
session_(expect<Session>(open(
std::move(expect(config_from_file(configFile.string().c_str())))))) {}
session_(openSession(configFile)) {}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the error condition for the expect<Session> checked? That error should probably be an exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"expect" raises the exception

Most of the basic functionality is present, but there are still several
TODO items remaining:

* Rework invoke_nonblock_callback call
* Finish all TODO comments in code files
* Implement cleanupListener
* Add and finish tests
* Fix anything that fails tests
* Switch to Zenoh 1.0 when available
@gregmedd gregmedd force-pushed the feature/1.5.8/zenoh-utransport-impl branch from c53133f to 3975338 Compare July 8, 2024 20:47
* Remove use to gtest outside of test tree.
* Address error message from zenohcpp library.
* Switch to our temporary zenohcpp conan recipe.
@gregmedd gregmedd force-pushed the feature/1.5.8/zenoh-utransport-impl branch from 3975338 to cefbca4 Compare July 25, 2024 00:50
@gregmedd gregmedd marked this pull request as ready for review July 25, 2024 00:54
@gregmedd
Copy link
Contributor Author

gregmedd commented Jul 25, 2024

This is an early draft of the zenoh utransport implementation. It is known to contain bugs and not be fully functional.

We plan on merging this once eclipse-uprotocol/up-cpp#240 has been merged. All existing feedback will be captured as bugs to be addressed after the merge. This will allow multiple contributors to proceed in parallel using this code as a foundation.

attributes = attachmentToUAttributes(query.get_attachment());
}
auto id_str = serializer::uuid::AsString().serialize(attributes.id());
std::unique_lock<std::mutex> lock(query_map_mutex_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to take the data structs that need thread safety, and turn them into classes holding the map or whatever and the mutex, and having accesor methods to do the locking? This style would make the main code much more readable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, but before we should solve the question about cleanupListener signature, because if it will be as is (but not as rust one) we should rework these internals types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sashacmc - I'll summarize what we discussed on cleanupListener over slack here so its available for everyone.

cleanupListener() is not a direct equivalent to unregisterListener() in other language libraries. Since we are using connection objects to represent listener callbacks, by the time cleanupListener() is called the connection is already broken. Calling a disconnected handle from the transport side will do nothing.

The purpose of cleanupListener() is purely informational - it exists to inform the transport implementation that a connection to a listener / callback function has been broken and that, depending on the details of the transport's implementation, it may need to schedule a cleanup operation. Since the connection to the callback function is already broken, this cleanup does not need to occur immediately. It can be deferred if that makes the cleanup easier.

Additionally, while the listener handle parameter passed to cleanupListener() can be compared against other listener handles to find matching handles, that is not strictly necessary. The CallerHandle class used there has both an isConnected() method and a boolean conversion operator that will return false if the handle is no longer connected. A transport implementation could just remove all unconnected handles after cleanupListener() is called.

if (auto resp_callback_it = rpc_callback_map_.find(source_str);
resp_callback_it == rpc_callback_map_.end()) {
return uError(UCode::UNAVAILABLE, "failed to find UUID");
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In relation to my prior comment about making a collection of thread safe data struct wrappers, this code shows the error object being created inside a locked region when it really shouldn't be. It seems better to me to have a thread-safe RpcCallbackMap class with an accessor like "[[nodiscard]] bool find(CallableConn& thing_to_capture)".

std::string data;
attributes.SerializeToString(&data);

res.push_back(std::make_pair("", version));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be emplace_back instead of push_back?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

Updating up-cpp version, rewriting pub/sub test to use L2 APIs, updating
the transport to use the revised up-cpp interface from up-cpp#240, and
fixing some general bugs.

Also adds a zenoh config json for use with the tests.
@gregmedd gregmedd force-pushed the feature/1.5.8/zenoh-utransport-impl branch from 057e129 to dab0902 Compare July 26, 2024 19:02
@gregmedd gregmedd self-assigned this Jul 26, 2024
@gregmedd gregmedd force-pushed the feature/1.5.8/zenoh-utransport-impl branch 2 times, most recently from 2cc0f8b to 78de5e5 Compare July 27, 2024 01:42
The pre-release up-cpp has critical bugfixes required for this code to
build.

Also makes sure the test configs are captured in the build artifacts so
tests can run in CI.
We know that the current state of this implementaiton does not fully
work. Making this change is a compromise - it keeps the tests active and
running so we can see the output as we work on resolving issues, but
does not prevent us from merging early, incomplete code.

Once this implementation is more stable, we will re-enable this check
@gregmedd gregmedd force-pushed the feature/1.5.8/zenoh-utransport-impl branch from 78de5e5 to db76fd6 Compare July 27, 2024 01:46
@gregmedd gregmedd merged commit b701e35 into eclipse-uprotocol:main Jul 27, 2024
4 of 5 checks passed
@gregmedd gregmedd deleted the feature/1.5.8/zenoh-utransport-impl branch July 27, 2024 01:52
gregmedd added a commit that referenced this pull request Jul 29, 2024
Fix unittest and code cleanup:

* fix some notes from #65
* implement cleanupListener
* change listener storing key
* fix lambda transmission to zenoh
@gregmedd gregmedd linked an issue Jul 30, 2024 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants